package com.mxd.flink.connector;

import com.mxd.flink.connector.config.RedisOptions;
import com.mxd.flink.connector.operator.RedisOperator;
import com.mxd.flink.connector.operator.RedisOperators;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.util.ArrayList;
import java.util.List;

/**
 * @author rongdi
 * @date 2022/9/16 15:08
 */
//@Ignore
public class SinkSQL {

    @Before
    public void init() {
        RedisOptions.IS_TEST = true;
        RedisOperator redisOperator = RedisOperators.getSimple("10.201.0.33",6379,"123456",0);
        List<String> lists = new ArrayList<>();
        for (int i = 0; i < 1000; i++) {
            lists.add("{\n" +
                    "  \"number\": " + i + ",\n" +
                    "  \"name\": \"学生" + i + "\",\n" +
                    "  \"school\": \"学校" + ((i % 3) + 1) +"\",\n" +
                    "  \"class_id\": " + ((i % 10) + 1) +"\n" +
                    "}");
        }
        /**
         * 初始化学生数据
         */
        for (int i = 0; i < 1; i++) {
            redisOperator.rpush("students", lists.subList(1000 * i, 1000 * (i + 1)));
        }
        /**
         * 初始化班级数据
         */
        for(int i = 0;i < 10;i++) {
            redisOperator.set(String.valueOf(i + 1),"银河" + (i + 1) + "班");
        }

        /**
         * 初始化学校班级数据
         */
        for(int j = 1;j < 4;j++) {
            for (int i = 1; i < 11; i++) {
                redisOperator.hset("学校" + j, String.valueOf(i), "银河" + i + "班");
            }
        }
    }


    @Test
    public void testLPushSQL() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        EnvironmentSettings environmentSettings =
                EnvironmentSettings.newInstance().inStreamingMode().build();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, environmentSettings);

        String source =
                "CREATE TABLE students\n" +
                        "(\n" +
                        "    number  BIGINT ,\n" +
                        "    name  string,\n" +
                        "    school   string, \n" +
                        "    class_id   BIGINT, \n" +
                        "    proctime as PROCTIME() \n" +
                        ") \n" +
                        "WITH (\n" +
                        "  'connector'='redis',\n" +
                        "  'host'='10.201.0.33', \n" +
                        "  'port'='6379',\n" +
                        "  'redis-mode'='single', \n" +
                        "  'password'='123456',\n" +
                        "  'database'='0',\n" +
                        "  'key'='students',\n" +
                        "  'format'='json',\n" +
                        "  'batch-fetch-rows'='1000',\n" +
                        "  'json.fail-on-missing-field' = 'false',\n" +
                        "  'json.ignore-parse-errors' = 'true',\n" +
                        "  'command'='BLPOP'\n" +
                        " )";

        String daeamon =
                "CREATE TABLE classes\n" +
                        "(\n" +
                        "    school   string, \n" +
                        "    class_id  BIGINT   ,\n" +
                        "    class_name  string   " +
                        ") \n" +
                        "WITH (\n" +
                        "  'connector'='redis',\n" +
                        "  'host'='10.201.0.33', \n" +
                        "  'port'='6379',\n" +
                        "  'redis-mode'='single', \n" +
                        "  'password'='123456',\n" +
                        "  'lookup.cache.max-rows'='1000',\n" +
                        "  'lookup.cache.ttl'='3600',\n" +
                        "  'lookup.cache.load-all'='true',\n" +
                        "  'database'='0',\n" +
                        "  'command'='HGET'\n" +
                        " )";

        /**
         *  1、这里因为command是LPUSH，所以不需要primary key(number) not enforced，
         *  因为这种命令只支持INSERT语义
         *  2、并行度配置项sink.parallelism没有配置，默认为核心数
         */
        String sink =
                "CREATE TABLE sink_students\n" +
                        "(\n" +
                        "    number  BIGINT ,\n" +
                        "    name  string,\n" +
                        "    school   string, \n" +
                        "    class_id   BIGINT, \n" +
                        "    class_name   string \n" +
                        ") \n" +
                        "WITH (\n" +
                        "  'connector'='redis',\n" +
                        "  'host'='10.201.0.33', \n" +
                        "  'port'='6379',\n" +
                        "  'redis-mode'='single', \n" +
                        "  'password'='123456',\n" +
                        "  'database'='0',\n" +
                        "  'key'='sink_students_list',\n" +
                        "  'format'='json',\n" +
                        "  'batch-fetch-rows'='1000',\n" +
                        "  'json.fail-on-missing-field' = 'false',\n" +
                        "  'json.ignore-parse-errors' = 'true',\n" +
                        "  'command'='LPUSH'\n" +
                        " )";

//        String sink =
//                "CREATE TABLE sink_students\n" +
//                        "(\n" +
//                        "    number  BIGINT ,\n" +
//                        "    name  string,\n" +
//                        "    school   string, \n" +
//                        "    class_id   BIGINT, \n" +
//                        "    class_name   string \n" +
//                        ") \n" +
//                        "WITH (\n" +
//                        "  'connector'='print'\n" +
//                        " )";

        tEnv.executeSql(source);
        tEnv.executeSql(daeamon);
        tEnv.executeSql(sink);

        String sql =
                " insert into sink_students "
                + " select s.number,s.name,s.school,s.class_id,d.class_name  from students s"
                + " left join classes for system_time as of s.proctime as d on d.class_id = s.class_id and d.school = s.school";
        TableResult tableResult = tEnv.executeSql(sql);
        tableResult.getJobClient().get().getJobExecutionResult().get();
    }

    @Test
    public void testRPushSQL() throws Exception {
        long start = System.currentTimeMillis();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        EnvironmentSettings environmentSettings =
                EnvironmentSettings.newInstance().inStreamingMode().build();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, environmentSettings);

        String source =
                "CREATE TABLE students\n" +
                        "(\n" +
                        "    number  BIGINT ,\n" +
                        "    name  string,\n" +
                        "    school   string, \n" +
                        "    class_id   BIGINT, \n" +
                        "    proctime as PROCTIME() \n" +
                        ") \n" +
                        "WITH (\n" +
                        "  'connector'='redis',\n" +
                        "  'host'='10.201.0.33', \n" +
                        "  'port'='6379',\n" +
                        "  'redis-mode'='single', \n" +
                        "  'password'='123456',\n" +
                        "  'database'='0',\n" +
                        "  'key'='students',\n" +
                        "  'format'='json',\n" +
                        "  'batch-fetch-rows'='1000',\n" +
                        "  'json.fail-on-missing-field' = 'false',\n" +
                        "  'json.ignore-parse-errors' = 'true',\n" +
                        "  'command'='BLPOP'\n" +
                        " )";

        String daeamon =
                "CREATE TABLE classes\n" +
                        "(\n" +
                        "    school   string, \n" +
                        "    class_id  BIGINT   ,\n" +
                        "    class_name  string   " +
                        ") \n" +
                        "WITH (\n" +
                        "  'connector'='redis',\n" +
                        "  'host'='10.201.0.33', \n" +
                        "  'port'='6379',\n" +
                        "  'redis-mode'='single', \n" +
                        "  'password'='123456',\n" +
                        "  'lookup.cache.max-rows'='1000',\n" +
                        "  'lookup.cache.ttl'='3600',\n" +
                        "  'lookup.cache.load-all'='true',\n" +
                        "  'database'='0',\n" +
                        "  'command'='HGET'\n" +
                        " )";

        /**
         *  1、这里因为command是RPUSH，所以不需要primary key(number) not enforced，
         *  因为这种命令只支持INSERT语义
         *  2、这里sink.parallelism并行度为1，效率肯定是最慢的，但是在那种需要严格的按顺序
         *  将数据push到list里的时候可以用
         */
        String sink =
                "CREATE TABLE sink_students\n" +
                        "(\n" +
                        "    number  BIGINT ,\n" +
                        "    name  string,\n" +
                        "    school   string, \n" +
                        "    class_id   BIGINT, \n" +
                        "    class_name   string \n" +
                        ") \n" +
                        "WITH (\n" +
                        "  'connector'='redis',\n" +
                        "  'host'='10.201.0.33', \n" +
                        "  'port'='6379',\n" +
                        "  'redis-mode'='single', \n" +
                        "  'password'='123456',\n" +
                        "  'database'='0',\n" +
                        "  'key'='sink_students_list',\n" +
                        "  'format'='json',\n" +
                        "  'batch-fetch-rows'='1000',\n" +
                        "  'json.fail-on-missing-field' = 'false',\n" +
                        "  'json.ignore-parse-errors' = 'true',\n" +
                        "  'sink.parallelism' = '1',\n" +
                        "  'command'='RPUSH'\n" +
                        " )";

        tEnv.executeSql(source);
        tEnv.executeSql(daeamon);
        tEnv.executeSql(sink);

        String sql =
                " insert into sink_students "
                        + " select s.number,s.name,s.school,s.class_id,d.class_name  from students s"
                        + " left join classes for system_time as of s.proctime as d on d.class_id = s.class_id and d.school = s.school";
        TableResult tableResult = tEnv.executeSql(sql);
        tableResult.getJobClient().get().getJobExecutionResult().get();
        long end = System.currentTimeMillis();
        System.out.println("耗时：" + (end - start) + "ms");
    }

    @Test
    public void testSADD() throws Exception {
        long start = System.currentTimeMillis();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        EnvironmentSettings environmentSettings =
                EnvironmentSettings.newInstance().inStreamingMode().build();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, environmentSettings);

        String source =
                "CREATE TABLE students\n" +
                        "(\n" +
                        "    number  BIGINT ,\n" +
                        "    name  string,\n" +
                        "    school   string, \n" +
                        "    class_id   BIGINT, \n" +
                        "    proctime as PROCTIME() \n" +
                        ") \n" +
                        "WITH (\n" +
                        "  'connector'='redis',\n" +
                        "  'host'='10.201.0.33', \n" +
                        "  'port'='6379',\n" +
                        "  'redis-mode'='single', \n" +
                        "  'password'='123456',\n" +
                        "  'database'='0',\n" +
                        "  'key'='students',\n" +
                        "  'format'='json',\n" +
                        "  'batch-fetch-rows'='1000',\n" +
                        "  'json.fail-on-missing-field' = 'false',\n" +
                        "  'json.ignore-parse-errors' = 'true',\n" +
                        "  'command'='BLPOP'\n" +
                        " )";

        String daeamon =
                "CREATE TABLE classes\n" +
                        "(\n" +
                        "    school   string, \n" +
                        "    class_id  BIGINT   ,\n" +
                        "    class_name  string   " +
                        ") \n" +
                        "WITH (\n" +
                        "  'connector'='redis',\n" +
                        "  'host'='10.201.0.33', \n" +
                        "  'port'='6379',\n" +
                        "  'redis-mode'='single', \n" +
                        "  'password'='123456',\n" +
                        "  'lookup.cache.max-rows'='1000',\n" +
                        "  'lookup.cache.ttl'='3600',\n" +
                        "  'lookup.cache.load-all'='true',\n" +
                        "  'database'='0',\n" +
                        "  'command'='HGET'\n" +
                        " )";

        /**
         *  1、这里因为command是RADD，所以不需要primary key(number) not enforced，
         *  因为这种命令只支持INSERT语义
         *  2、并行度配置项sink.parallelism没有配置，默认为核心数
         */
        String sink =
                "CREATE TABLE sink_students\n" +
                        "(\n" +
                        "    number  BIGINT ,\n" +
                        "    name  string,\n" +
                        "    school   string, \n" +
                        "    class_id   BIGINT, \n" +
                        "    class_name   string, \n" +
                        "    primary key(school,class_id,class_name) not enforced" +
                        ") \n" +
                        "WITH (\n" +
                        "  'connector'='redis',\n" +
                        "  'host'='10.201.0.33', \n" +
                        "  'port'='6379',\n" +
                        "  'redis-mode'='single', \n" +
                        "  'password'='123456',\n" +
                        "  'database'='0',\n" +
                        "  'key'='sink_students_set',\n" +
                        "  'format'='json',\n" +
                        "  'batch-fetch-rows'='1000',\n" +
                        "  'json.fail-on-missing-field' = 'false',\n" +
                        "  'json.ignore-parse-errors' = 'true',\n" +
                        "  'sink.parallelism' = '16',\n" +
                        "  'command'='SADD'\n" +
                        " )";

        tEnv.executeSql(source);
        tEnv.executeSql(daeamon);
        tEnv.executeSql(sink);

        String sql =
                " insert into sink_students "
                        + " select s.number,s.name,s.school,s.class_id,d.class_name  from students s"
                        + " left join classes for system_time as of s.proctime as d on d.class_id = s.class_id and d.school = s.school";
        TableResult tableResult = tEnv.executeSql(sql);
        tableResult.getJobClient().get().getJobExecutionResult().get();
        long end = System.currentTimeMillis();
        System.out.println("耗时：" + (end - start) + "ms");
    }

    @Test
    public void testSet() throws Exception {
        long start = System.currentTimeMillis();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        EnvironmentSettings environmentSettings =
                EnvironmentSettings.newInstance().inStreamingMode().build();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, environmentSettings);

        String source =
                "CREATE TABLE students\n" +
                        "(\n" +
                        "    number  BIGINT ,\n" +
                        "    name  string,\n" +
                        "    school   string, \n" +
                        "    class_id   BIGINT, \n" +
                        "    proctime as PROCTIME() \n" +
                        ") \n" +
                        "WITH (\n" +
                        "  'connector'='redis',\n" +
                        "  'host'='10.201.0.33', \n" +
                        "  'port'='6379',\n" +
                        "  'redis-mode'='single', \n" +
                        "  'password'='123456',\n" +
                        "  'database'='0',\n" +
                        "  'key'='students',\n" +
                        "  'format'='json',\n" +
                        "  'batch-fetch-rows'='1000',\n" +
                        "  'json.fail-on-missing-field' = 'false',\n" +
                        "  'json.ignore-parse-errors' = 'true',\n" +
                        "  'command'='BLPOP'\n" +
                        " )";

        String daeamon =
                "CREATE TABLE classes\n" +
                        "(\n" +
                        "    school   string, \n" +
                        "    class_id  BIGINT   ,\n" +
                        "    class_name  string   " +
                        ") \n" +
                        "WITH (\n" +
                        "  'connector'='redis',\n" +
                        "  'host'='10.201.0.33', \n" +
                        "  'port'='6379',\n" +
                        "  'redis-mode'='single', \n" +
                        "  'password'='123456',\n" +
                        "  'lookup.cache.max-rows'='1000',\n" +
                        "  'lookup.cache.ttl'='3600',\n" +
                        "  'lookup.cache.load-all'='true',\n" +
                        "  'database'='0',\n" +
                        "  'command'='HGET'\n" +
                        " )";

        /**
         *  1、这里因为command是SET，所以需要一个key，这里key就是使用主键，多个就用下划线拼接起来，
         *  2、并行度配置项sink.parallelism没有配置，默认为核心数
         */
        String sink =
                "CREATE TABLE sink_students\n" +
                        "(\n" +
                        "    school   string, \n" +
                        "    number  BIGINT ,\n" +
                        "    name  string,\n" +
                        "    class_id   BIGINT, \n" +
                        "    class_name   string, \n" +
                        "    primary key(school,number) not enforced" +
                        ") \n" +
                        "WITH (\n" +
                        "  'connector'='redis',\n" +
                        "  'host'='10.201.0.33', \n" +
                        "  'port'='6379',\n" +
                        "  'redis-mode'='single', \n" +
                        "  'password'='123456',\n" +
                        "  'database'='0',\n" +
                        "  'format'='json',\n" +
                        "  'batch-fetch-rows'='1000',\n" +
                        "  'json.fail-on-missing-field' = 'false',\n" +
                        "  'json.ignore-parse-errors' = 'true',\n" +
                        "  'command'='SET'\n" +
                        " )";

        tEnv.executeSql(source);
        tEnv.executeSql(daeamon);
        tEnv.executeSql(sink);

        String sql =
                " insert into sink_students "
                        + " select s.school,s.number,s.name,s.class_id,d.class_name  from students s"
                        + " left join classes for system_time as of s.proctime as d on d.class_id = s.class_id and d.school = s.school";
        TableResult tableResult = tEnv.executeSql(sql);
        tableResult.getJobClient().get().getJobExecutionResult().get();
        long end = System.currentTimeMillis();
        System.out.println("耗时：" + (end - start) + "ms");
    }

    @Test
    public void testHSet() throws Exception {
        long start = System.currentTimeMillis();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        EnvironmentSettings environmentSettings =
                EnvironmentSettings.newInstance().inStreamingMode().build();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, environmentSettings);

        String source =
                "CREATE TABLE students\n" +
                        "(\n" +
                        "    number  BIGINT ,\n" +
                        "    name  string,\n" +
                        "    school   string, \n" +
                        "    class_id   BIGINT, \n" +
                        "    proctime as PROCTIME() \n" +
                        ") \n" +
                        "WITH (\n" +
                        "  'connector'='redis',\n" +
                        "  'host'='10.201.0.33', \n" +
                        "  'port'='6379',\n" +
                        "  'redis-mode'='single', \n" +
                        "  'password'='123456',\n" +
                        "  'database'='0',\n" +
                        "  'key'='students',\n" +
                        "  'format'='json',\n" +
                        "  'batch-fetch-rows'='1000',\n" +
                        "  'json.fail-on-missing-field' = 'false',\n" +
                        "  'json.ignore-parse-errors' = 'true',\n" +
                        "  'command'='BLPOP'\n" +
                        " )";

        String daeamon =
                "CREATE TABLE classes\n" +
                        "(\n" +
                        "    school   string, \n" +
                        "    class_id  BIGINT   ,\n" +
                        "    class_name  string   " +
                        ") \n" +
                        "WITH (\n" +
                        "  'connector'='redis',\n" +
                        "  'host'='10.201.0.33', \n" +
                        "  'port'='6379',\n" +
                        "  'redis-mode'='single', \n" +
                        "  'password'='123456',\n" +
                        "  'lookup.cache.max-rows'='1000',\n" +
                        "  'lookup.cache.ttl'='3600',\n" +
                        "  'lookup.cache.load-all'='true',\n" +
                        "  'database'='0',\n" +
                        "  'command'='HGET'\n" +
                        " )";

        /**
         *  1、这里因为command是HSET，所以需要一个key和一个field，这里是按照表申明的顺序，第一个作为key，
         *  第二个作为field，由于需要更新，也需要一个主键，这里最好把前两个字段一起作为主键
         *  2、作为sink有一个sink.key.ttl参数可以设置key保存在redis的ttl生存时间，单位秒，默认为-1表示
         *  长期保存
         */
        String sink =
                "CREATE TABLE sink_students\n" +
                        "(\n" +
                        "    school   string, \n" +
                        "    number  BIGINT ,\n" +
                        "    name  string,\n" +
                        "    class_id   BIGINT, \n" +
                        "    class_name   string, \n" +
                        "    primary key(school,number) not enforced" +
                        ") \n" +
                        "WITH (\n" +
                        "  'connector'='redis',\n" +
                        "  'host'='10.201.0.33', \n" +
                        "  'port'='6379',\n" +
                        "  'redis-mode'='single', \n" +
                        "  'password'='123456',\n" +
                        "  'database'='0',\n" +
                        "  'format'='json',\n" +
                        "  'batch-fetch-rows'='1000',\n" +
                        "  'json.fail-on-missing-field' = 'false',\n" +
                        "  'json.ignore-parse-errors' = 'true',\n" +
                        "  'sink.parallelism' = '16',\n" +
                        "  'sink.key.ttl' = '300',\n" +
                        "  'command'='HSET'\n" +
                        " )";

        tEnv.executeSql(source);
        tEnv.executeSql(daeamon);
        tEnv.executeSql(sink);

        String sql =
                " insert into sink_students "
                        + " select s.school,s.number,s.name,s.class_id,d.class_name  from students s"
                        + " left join classes for system_time as of s.proctime as d on d.class_id = s.class_id and d.school = s.school";
        TableResult tableResult = tEnv.executeSql(sql);
        tableResult.getJobClient().get().getJobExecutionResult().get();
        long end = System.currentTimeMillis();
        System.out.println("耗时：" + (end - start) + "ms");
    }

    @Test
    public void testHSetWithKey() throws Exception {
        long start = System.currentTimeMillis();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        EnvironmentSettings environmentSettings =
                EnvironmentSettings.newInstance().inStreamingMode().build();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, environmentSettings);

        String source =
                "CREATE TABLE students\n" +
                        "(\n" +
                        "    number  BIGINT ,\n" +
                        "    name  string,\n" +
                        "    school   string, \n" +
                        "    class_id   BIGINT, \n" +
                        "    proctime as PROCTIME() \n" +
                        ") \n" +
                        "WITH (\n" +
                        "  'connector'='redis',\n" +
                        "  'host'='10.201.0.33', \n" +
                        "  'port'='6379',\n" +
                        "  'redis-mode'='single', \n" +
                        "  'password'='123456',\n" +
                        "  'database'='0',\n" +
                        "  'key'='students',\n" +
                        "  'format'='json',\n" +
                        "  'batch-fetch-rows'='1000',\n" +
                        "  'json.fail-on-missing-field' = 'false',\n" +
                        "  'json.ignore-parse-errors' = 'true',\n" +
                        "  'command'='BLPOP'\n" +
                        " )";

        String daeamon =
                "CREATE TABLE classes\n" +
                        "(\n" +
                        "    school   string, \n" +
                        "    class_id  BIGINT   ,\n" +
                        "    class_name  string   " +
                        ") \n" +
                        "WITH (\n" +
                        "  'connector'='redis',\n" +
                        "  'host'='10.201.0.33', \n" +
                        "  'port'='6379',\n" +
                        "  'redis-mode'='single', \n" +
                        "  'password'='123456',\n" +
                        "  'lookup.cache.max-rows'='1000',\n" +
                        "  'lookup.cache.ttl'='3600',\n" +
                        "  'lookup.cache.load-all'='true',\n" +
                        "  'database'='0',\n" +
                        "  'command'='HGET'\n" +
                        " )";

        /**
         *  1、这里因为command是HSET，所以需要一个key和一个field，这里配置项指定了key，那么主键拼接
         *  就作为field，使用hset保存到redis
         *  2、作为sink有一个sink.key.ttl参数可以设置key保存在redis的ttl生存时间，单位秒，默认-1
         *  表示长期保存
         */
        String sink =
                "CREATE TABLE sink_students\n" +
                        "(\n" +
                        "    school   string, \n" +
                        "    number  BIGINT ,\n" +
                        "    name  string,\n" +
                        "    class_id   BIGINT, \n" +
                        "    class_name   string, \n" +
                        "    primary key(number) not enforced" +
                        ") \n" +
                        "WITH (\n" +
                        "  'connector'='redis',\n" +
                        "  'host'='10.201.0.33', \n" +
                        "  'port'='6379',\n" +
                        "  'redis-mode'='single', \n" +
                        "  'password'='123456',\n" +
                        "  'database'='0',\n" +
                        "  'format'='json',\n" +
                        "  'key'='sink_students_hset',\n" +
                        "  'batch-fetch-rows'='1000',\n" +
                        "  'json.fail-on-missing-field' = 'false',\n" +
                        "  'json.ignore-parse-errors' = 'true',\n" +
                        "  'sink.parallelism' = '16',\n" +
                        "  'sink.key.ttl' = '300',\n" +
                        "  'command'='HSET'\n" +
                        " )";

        tEnv.executeSql(source);
        tEnv.executeSql(daeamon);
        tEnv.executeSql(sink);

        String sql =
                " insert into sink_students "
                        + " select s.school,s.number,s.name,s.class_id,d.class_name  from students s"
                        + " left join classes for system_time as of s.proctime as d on d.class_id = s.class_id and d.school = s.school";
        TableResult tableResult = tEnv.executeSql(sql);
        tableResult.getJobClient().get().getJobExecutionResult().get();
        long end = System.currentTimeMillis();
        System.out.println("耗时：" + (end - start) + "ms");
    }

    @After
    public void destroy() {
        RedisOperator redisOperator = RedisOperators.getSimple("10.201.0.33",6379,"123456",0);
        redisOperator.del("students");
        /**
         * 初始化班级数据
         */
        for(int i = 0;i < 100;i++) {
            redisOperator.del(String.valueOf(i + 1));
        }

        /**
         * 初始化学校班级数据
         */
        for(int j = 1;j < 4;j++) {
            for (int i = 1; i < 11; i++) {
                redisOperator.hdel("学校" + j, String.valueOf(i));
            }
        }
    }

}
